-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: kafka-consumer-parallel-after #319
Automated Test: kafka-consumer-parallel-after #319
Conversation
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
📝 WalkthroughWalkthroughThe PR introduces a thread-queue-parallel processing mode for Kafka result consumption. It adds a modular in-process framework with per-partition offset tracking, multiple worker queues with per-group ordering guarantees, and integrates this into the results consumer factory. Comprehensive tests validate queue mechanics, offset management, ordering semantics, and end-to-end integration. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant SubmitHandler as submit()
participant Decoder as decoder
participant GroupingFn as grouping_fn
participant QueuePool as FixedQueuePool
participant WorkQueue as WorkQueue<br/>(per group)
participant Worker as OrderedQueueWorker
participant Processor as result_processor
participant Tracker as OffsetTracker
participant CommitLoop as _commit_loop
participant Commit as commit_function
Client->>SubmitHandler: submit(message)
SubmitHandler->>Decoder: decode(payload)
Decoder-->>SubmitHandler: result
SubmitHandler->>GroupingFn: get_group_key(result)
GroupingFn-->>SubmitHandler: group_key
SubmitHandler->>Tracker: add_offset(partition, offset)
SubmitHandler->>QueuePool: submit(group_key, WorkItem)
QueuePool->>QueuePool: get_queue_for_group(group_key)
QueuePool->>WorkQueue: put(WorkItem)
Worker->>WorkQueue: get(WorkItem)
Worker->>Processor: process(group_key, result)
Processor-->>Worker: ✓
Worker->>Tracker: complete_offset(partition, offset)
CommitLoop->>CommitLoop: periodic timer
CommitLoop->>Tracker: get_committable_offsets()
Tracker-->>CommitLoop: dict[partition→offset]
CommitLoop->>Commit: offsets_dict
Commit->>Tracker: mark_committed(partition, offset)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/sentry/remote_subscriptions/consumers/queue_consumer.py`:
- Around line 67-98: The get_committable_offsets implementation currently uses
range(start, max_offset+1) which can iterate over many absent offsets; replace
that range-based scan with an iteration over only the present offsets (sort the
partition's all_offsets and filter >= start) and detect contiguity by tracking
the next expected offset. Specifically, in get_committable_offsets, for each
partition use sorted_offsets = sorted(o for o in all_offsets if o >= start), set
expected = start, then loop sorted_offsets: if offset == expected and offset not
in outstanding update highest_committable and expected += 1 else break; keep
using last_committed, outstanding, all_offsets and highest_committable as
before.
🧹 Nitpick comments (3)
tests/sentry/uptime/consumers/test_results_consumer.py (2)
2017-2017: Consider usingClassVarannotation for class-level mutable attribute.Static analysis suggests annotating
pytestmarkwithClassVarfor clarity.+from typing import ClassVar + class ProcessResultThreadQueueParallelKafkaTest(UptimeTestCase): - pytestmark = [requires_kafka] + pytestmark: ClassVar = [requires_kafka]
2142-2146: Log the exception during cleanup for better debugging.Silently catching exceptions during cleanup can mask issues during test debugging. Consider logging the exception.
🔧 Proposed fix
finally: try: admin_client.delete_topics([test_topic]) - except Exception: - pass + except Exception as e: + logger.warning("Failed to delete test topic %s: %s", test_topic, e)src/sentry/remote_subscriptions/consumers/queue_consumer.py (1)
344-345: Thetimeoutparameter injoin()is ignored.The
joinmethod ignores thetimeoutparameter and just callsclose(). Looking at other processing strategies in the codebase (e.g.,SpanFlusher.join), they typically honor the timeout parameter for graceful shutdown coordination.Consider passing the timeout through to the underlying operations:
🔧 Proposed fix
def join(self, timeout: float | None = None) -> None: - self.close() + self.shutdown_event.set() + if timeout is not None: + self.commit_thread.join(timeout=timeout / 2) + # Use remaining time for queue pool + self.queue_pool.wait_until_empty(timeout=timeout / 2) + else: + self.commit_thread.join(timeout=5.0) + self.queue_pool.shutdown()
| def get_committable_offsets(self) -> dict[Partition, int]: | ||
| """ | ||
| Get the highest offset per partition that can be safely committed. | ||
|
|
||
| For each partition, finds the highest contiguous offset that has been processed. | ||
| """ | ||
| committable = {} | ||
| for partition in list(self.all_offsets.keys()): | ||
| with self._get_partition_lock(partition): | ||
| all_offsets = self.all_offsets[partition] | ||
| if not all_offsets: | ||
| continue | ||
|
|
||
| outstanding = self.outstanding[partition] | ||
| last_committed = self.last_committed.get(partition, -1) | ||
|
|
||
| min_offset = min(all_offsets) | ||
| max_offset = max(all_offsets) | ||
|
|
||
| start = max(last_committed + 1, min_offset) | ||
|
|
||
| highest_committable = last_committed | ||
| for offset in range(start, max_offset + 1): | ||
| if offset in all_offsets and offset not in outstanding: | ||
| highest_committable = offset | ||
| else: | ||
| break | ||
|
|
||
| if highest_committable > last_committed: | ||
| committable[partition] = highest_committable | ||
|
|
||
| return committable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
Kafka partition offsets contiguous gaps sparsity
💡 Result:
Kafka offsets are per-partition, monotonically increasing log positions, but they are not guaranteed to be “dense” (i.e., you should not assume every integer offset corresponds to an application-visible record).
Why you can see gaps / sparsity in a partition’s offsets:
- Transactional control records (“commit/abort markers”) consume offsets but are not delivered to applications. Kafka writes control records into the log, and consumers use them to handle transactions; they “take up” offsets even though your app never sees them. [1]
- Aborted transactional records can be skipped (especially with
read_committed). WithREAD_COMMITTED, clients are given enough information (e.g., aborted-transaction metadata; LSO/last stable offset concepts) to discard ABORTED transactional data, which can make offsets appear to jump. [2] - Log compaction removes records but does not renumber offsets. After compaction, “missing” offsets remain valid positions; a fetch starting at a compacted-away offset returns the next higher offset that still exists. This creates long-lived holes in older parts of the log. [3]
- At-least-once / retries / duplicates don’t create gaps, but they break “offset == message count” assumptions. Extra physical records (including duplicates and transaction markers) mean offsets can advance faster than your perceived logical stream. [4]
Practical takeaway: treat offsets as opaque positions used for ordering and checkpointing, not as a contiguous sequence of “real messages.” The only safe assumption is ordering within a partition and uniqueness of offsets within that partition, not density. [3][4]
Sources:
[1] https://kafka.apache.org/41/implementation/message-format/
[2] https://kafka.apache.org/25/design/protocol/
[3] https://docs.confluent.io/kafka/design/log_compaction.html
[4] https://stackoverflow.com/questions/54636524/kafka-streams-does-not-increment-offset-by-1-when-producing-to-topic
Performance concern with sparse offset iteration.
The get_committable_offsets method iterates through range(start, max_offset + 1) which could be expensive if offsets are very sparse. Kafka offsets are not guaranteed to be contiguous — gaps can occur due to transactional control records, aborted transactions, log compaction, and other factors. While the loop breaks on the first gap (limiting the iteration to the first contiguous block), offsets with significant sparse regions could still impact performance. If this becomes a bottleneck, consider tracking offsets in a sorted data structure to avoid iterating through absent offsets.
🤖 Prompt for AI Agents
In `@src/sentry/remote_subscriptions/consumers/queue_consumer.py` around lines 67
- 98, The get_committable_offsets implementation currently uses range(start,
max_offset+1) which can iterate over many absent offsets; replace that
range-based scan with an iteration over only the present offsets (sort the
partition's all_offsets and filter >= start) and detect contiguity by tracking
the next expected offset. Specifically, in get_committable_offsets, for each
partition use sorted_offsets = sorted(o for o in all_offsets if o >= start), set
expected = start, then loop sorted_offsets: if offset == expected and offset not
in outstanding update highest_committable and expected += 1 else break; keep
using last_committed, outstanding, all_offsets and highest_committable as
before.
This pull request was automatically created by
@coderabbitai/e2e-reviewer.Batch created pull request.
Summary by CodeRabbit
Release Notes
New Features
Refactor
Tests